package com.bw.realtime_common.base;


import com.bw.realtime_common.util.FlinkSourceUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;

/**
 * @BelongsProject: gmall_lx
 * @BelongsPackage: com.bw.realtime_common.base
 * @Author: cuizexuan
 * @CreateTime: 2025-06-19  21:59
 * @Description: TODO
 * @Version: 1.0
 */
public abstract class BaseApp {
    public void start(String topicDb,String groupId,int p,int port){
        //设置hadoop执行用户
        System.setProperty("HADOOP_USER_NAME","root");
        //创建流环境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",port);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 3.设置并行度
        env.setParallelism(p);
        // 4.设置CK
        env.setStateBackend(new HashMapStateBackend());
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop101:8020/gmall/stream/"+groupId );
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);
        //消费数据
        DataStreamSource<String> dataStreamSource = env.fromSource(FlinkSourceUtil.getKafkaSource(topicDb, groupId), WatermarkStrategy.noWatermarks(), "Kafka Source");
        //打印数据
//        dataStreamSource.print();
        handle(env,dataStreamSource);
        //执行
        try {
            env.execute();
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    public abstract  void  handle(StreamExecutionEnvironment env,DataStreamSource<String> dataStreamSource);
}
